package net.quanter.shield.mq.rocketmq.consumer;


import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class RocketMQCommunityConsumer implements MQConsumer {

    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final DefaultMQPushConsumer consumer;
    private final RocketMQConsumerParam[] mqConsumerParams;

    public RocketMQCommunityConsumer(
            RocketMQBorkerParam mqConnectVO,
            String groupId,
            RocketMQConsumerParam... mqConsumerParams) throws MQClientException {

        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        this.mqConsumerParams = mqConsumerParams;
        consumer = new DefaultMQPushConsumer(groupId);
        consumer.setNamesrvAddr(mqConnectVO.getEndPoint());
        /**
         * 社区版恶心的地方在于，订阅多个topic和tag后，消息会收到统一的一个listnern里去处理
         * 只能在MessageListener里根据tag做过滤
         */

        Map<String, String> topicTagMap = new HashMap<>();

        for (RocketMQConsumerParam mqConsumerParam : mqConsumerParams) {
            String topic = mqConsumerParam.getTopic().getName();
            String tagString = mqConsumerParam.getTagString();
            if (!topicTagMap.containsKey(topic)) {
                topicTagMap.put(topic, tagString);
            } else {
                String tag = topicTagMap.get(mqConsumerParam.getTopic().getName());
                if ("*".equals(tag)) {
                    continue;
                } else {
                    if ("*".equals(tagString)) {
                        topicTagMap.put(topic, "*");
                    } else {
                        topicTagMap.put(topic, tag + "||" + tagString);
                    }
                }
            }
        }

        for (Map.Entry<String, String> entry : topicTagMap.entrySet()) {
            consumer.subscribe(entry.getKey(), entry.getValue());
        }
        consumer.registerMessageListener(
                new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        boolean allResult = true;
                        for (MessageExt message : list) {
                            String topic = message.getTopic();
                            String tags = message.getTags();
                            for (RocketMQConsumerParam consumerParam : mqConsumerParams) {
                                if (consumerParam.getTopic().nameEquals(topic) && consumerParam.containTag(tags)) {
                                    try {
                                        MQMessageVO MQMessageVO = fromRocketTcpMessage(consumerParam.getListener().getMessageType(), message);
                                        boolean result = consumerParam.getListener().process(MQMessageVO);
                                        if (!result) {
                                            /**
                                             * 有一个listener返回失败，就统一认为是失败
                                             */
                                            allResult = false;
                                        }
                                    } catch (Throwable e) {
                                        log.error("fromRocketTcpMessage error,MessageType={},topic={},tags={}",
                                                consumerParam.getListener().getMessageType(), topic, tags, e);
                                        allResult = false;
                                        continue;
                                    }
                                }
                            }
                        }
                        return allResult ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
        );
    }

    public static <T> MQMessageVO<T> fromRocketTcpMessage(Type type, MessageExt message) {
        MQMessageVO mqMessageVO = new MQMessageVO();
        mqMessageVO.setTopic(message.getTopic());
        mqMessageVO.setTag(message.getTags());
        mqMessageVO.setConsumedTimes(message.getReconsumeTimes());
        mqMessageVO.setPublishTime(message.getBornTimestamp());
        mqMessageVO.setFirstConsumeTime(message.getStoreTimestamp());
        mqMessageVO.setOffset(message.getCommitLogOffset());
        mqMessageVO.setQueueOffset(message.getQueueOffset());
        if (message.getProperties() != null) {
            mqMessageVO.putAll(message.getProperties());
        }
        mqMessageVO.setMessageId(message.getMsgId());
        byte[] bytes = message.getBody();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        mqMessageVO.setObj(body);
        return mqMessageVO;
    }

    @Override
    public void start(RunType runType) throws MQClientException {
        consumer.start();
    }

    @Override
    public void stop() {
        consumer.shutdown();
    }
}
